/**
 * FileName: StreamingAggregate
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/18 16:56
 * Description: 聚合累加
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.util.ColumnsUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class StreamingAggregate {
    public static void main(String[] args) {
        /**
         * SparkSession 是 Spark SQL 的入口。
         * 使用 Dataset 或者 Datafram 编写 Spark SQL 应用的时候，第一个要创建的对象就是 SparkSession。
         */
        SparkSession sparkSession = SparkSession
                .builder()
                .getOrCreate();
        /**
         * 从kafka读取数据，为流式查询创建Kafka源
         */
        Dataset<Row> kafkaDataset = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
                .option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
                .option("startingOffsets", ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
                .load();

        /**
         * 将value列分割为多列，分割方法由column-mapping决定
         */
        Dataset<Row> columnDataset = ColumnsUtil
                .getInstance()
                .getMultiColumnDataset(kafkaDataset, "value");

        /**
         * 硬编码方式实现聚合计数
         *  min() max() sum() avg() count() etc.
         */
        columnDataset.selectExpr("phone")
                .groupBy("phone")
                .count();
        /**
         *  使用SQL进行数据的聚合
         */
        columnDataset.createOrReplaceTempView("kafka");
        Dataset<Row> result = sparkSession.sql("select phone,count(phone) count from kafka group by phone");

        /**
         * 打印结果输出到控制台,注意输出模式
         */
        StreamingQuery query = result
                .writeStream()
                .format("console")
                .outputMode(OutputMode.Complete())
                .option("truncate", "false")
                .start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }
}
